package net.wlm.jtp.dws.log.function;


import net.wlm.jtp.dws.log.bean.PageViewBean;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * 将pageLog数据封装实体类
 */

public class PageViewWindowFunction implements WindowFunction<PageViewBean, String, String, TimeWindow> {
    // 格式化实例
    private FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");

    public void apply(String key, TimeWindow window, Iterable<PageViewBean> input, Collector<String> out){
        // 获取窗口时间
        String windowStartTime = format.format(window.getStart());
        String windowEndTime = format.format(window.getEnd());

        // 计算数据
        Long pvCount = 0L;
        Long uvCount = 0L;
        Long pvDuringTime = 0L;
        Long sessionCount = 0L;
        for (PageViewBean bean : input) {
            pvCount += bean.getPvCount();
            uvCount += bean.getUvCount();
            pvDuringTime += bean.getPvDuringTime();
            sessionCount += bean.getSessionCount();
        }

        // 输出
        String output = windowStartTime + "," +  windowEndTime + "," +
                key + "," +
                pvCount + "," + pvDuringTime + "," +  uvCount + "," +   sessionCount + ","
                + System.currentTimeMillis();
        out.collect(output);
    }
}